package com.storm.demo.count;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import java.util.Collections;



public class WordCountTopologyTask {


    private static final String BROKERZKSTR = "";

    private static final String TOPIC = "";

    private static final String HOST = "";

    private static final String PORT = "";



    public static void main(String[] args){
        TopologyBuilder topologyBuilder = new TopologyBuilder();



        topologyBuilder.setSpout("mySpout", new MySpout(), 1);
        topologyBuilder.setBolt("myBolt1", new MysplitBolt(), 10).shuffleGrouping("mySpout");
        topologyBuilder.setBolt("myBolt2", new MyCountBolt(), 2).fieldsGrouping("myBolt1", new Fields("word"));

        Config config = new Config();
        /*
        设置该topology在storm集群中要抢占的资源slot数，一个slot对应这supervisor节点上的以个worker进程,
        如果你分配的spot数超过了你的物理节点所拥有的worker数目的话，有可能提交不成功，
        加入你的集群上面已经有了一些topology而现在还剩下2个worker资源，
        如果你在代码里分配4个给你的topology的话，那么这个topology可以提交但是提交以后你会发现并没有运行。
        而当你kill掉一些topology后释放了一些slot后你的这个topology就会恢复正常运行。
         */
        config.setNumWorkers(3);

        //3.提交任务   --两种模式  本地模式和集群模式
//        StormSubmitter.submitTopology("mywordcount", config, topologyBuilder.createTopology());

        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("mywordcount", config, topologyBuilder.createTopology());
    }


    public static void createTopology() {

    }


    public static void createKafka() {


        //设置喷发节点并分配并发数，该并发数将会控制该对象在集群中的线程数。
        BrokerHosts brokerHosts = new ZkHosts(BROKERZKSTR);
        // 配置Kafka订阅的Topic，以及zookeeper中数据节点目录和名字
        SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, TOPIC, "/storm", "s32");
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.zkServers = Collections.singletonList(HOST);
        spoutConfig.zkPort = Integer.parseInt(PORT);
        //从Kafka最新输出日志读取
        spoutConfig.startOffsetTime = System.nanoTime();




        //        new DelayKafkaSpout(spoutConfig);

//        KafkaSpout receiver = new KafkaSpout(spoutConfig);

//        MySpout mySpout = new MySpout(spoutConfig);
    }




}
